Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/skeleton.rs
6939 views
1
#![allow(unused)] // TODO: remove me
2
use std::cmp::Reverse;
3
4
use polars_core::POOL;
5
use polars_core::prelude::*;
6
use polars_expr::planner::{ExpressionConversionState, create_physical_expr, get_expr_depth_limit};
7
use polars_plan::plans::{Context, IR, IRPlan};
8
use polars_plan::prelude::AExpr;
9
use polars_plan::prelude::expr_ir::ExprIR;
10
use polars_utils::arena::{Arena, Node};
11
use slotmap::{SecondaryMap, SlotMap};
12
13
use crate::graph::{Graph, GraphNodeKey};
14
use crate::physical_plan::{PhysNode, PhysNodeKey, PhysNodeKind, StreamingLowerIRContext};
15
16
/// Executes the IR with the streaming engine.
17
///
18
/// Unsupported operations can fall back to the in-memory engine.
19
///
20
/// Returns:
21
/// - `Ok(QueryResult::Single(DataFrame))` when collecting to a single sink.
22
/// - `Ok(QueryResult::Multiple(Vec<DataFrame>))` when collecting to multiple sinks.
23
/// - `Err` if the IR can't be executed.
24
///
25
/// Returned `DataFrame`s contain data only for memory sinks,
26
/// `DataFrame`s corresponding to file sinks are empty.
27
pub fn run_query(
28
node: Node,
29
ir_arena: &mut Arena<IR>,
30
expr_arena: &mut Arena<AExpr>,
31
) -> PolarsResult<QueryResult> {
32
StreamingQuery::build(node, ir_arena, expr_arena)?.execute()
33
}
34
35
/// Visualizes the physical plan as a dot graph.
36
pub fn visualize_physical_plan(
37
node: Node,
38
ir_arena: &mut Arena<IR>,
39
expr_arena: &mut Arena<AExpr>,
40
) -> PolarsResult<String> {
41
let mut phys_sm = SlotMap::with_capacity_and_key(ir_arena.len());
42
43
let ctx = StreamingLowerIRContext {
44
prepare_visualization: true,
45
};
46
let root_phys_node =
47
crate::physical_plan::build_physical_plan(node, ir_arena, expr_arena, &mut phys_sm, ctx)?;
48
49
let out = crate::physical_plan::visualize_plan(root_phys_node, &phys_sm, expr_arena);
50
51
Ok(out)
52
}
53
54
pub struct StreamingQuery {
55
top_ir: IR,
56
graph: Graph,
57
root_phys_node: PhysNodeKey,
58
phys_sm: SlotMap<PhysNodeKey, PhysNode>,
59
phys_to_graph: SecondaryMap<PhysNodeKey, GraphNodeKey>,
60
}
61
62
impl StreamingQuery {
63
pub fn build(
64
node: Node,
65
ir_arena: &mut Arena<IR>,
66
expr_arena: &mut Arena<AExpr>,
67
) -> PolarsResult<Self> {
68
if let Ok(visual_path) = std::env::var("POLARS_VISUALIZE_IR") {
69
let plan = IRPlan {
70
lp_top: node,
71
lp_arena: ir_arena.clone(),
72
expr_arena: expr_arena.clone(),
73
};
74
let visualization = plan.display_dot().to_string();
75
std::fs::write(visual_path, visualization).unwrap();
76
}
77
let mut phys_sm = SlotMap::with_capacity_and_key(ir_arena.len());
78
let ctx = StreamingLowerIRContext {
79
prepare_visualization: false,
80
};
81
let root_phys_node = crate::physical_plan::build_physical_plan(
82
node,
83
ir_arena,
84
expr_arena,
85
&mut phys_sm,
86
ctx,
87
)?;
88
if let Ok(visual_path) = std::env::var("POLARS_VISUALIZE_PHYSICAL_PLAN") {
89
let visualization =
90
crate::physical_plan::visualize_plan(root_phys_node, &phys_sm, expr_arena);
91
std::fs::write(visual_path, visualization).unwrap();
92
}
93
94
let (mut graph, phys_to_graph) =
95
crate::physical_plan::physical_plan_to_graph(root_phys_node, &phys_sm, expr_arena)?;
96
97
let top_ir = ir_arena.get(node).clone();
98
99
let out = StreamingQuery {
100
top_ir,
101
graph,
102
root_phys_node,
103
phys_sm,
104
phys_to_graph,
105
};
106
107
Ok(out)
108
}
109
110
pub fn execute(self) -> PolarsResult<QueryResult> {
111
let StreamingQuery {
112
top_ir,
113
mut graph,
114
root_phys_node,
115
phys_sm,
116
phys_to_graph,
117
} = self;
118
119
crate::async_executor::clear_task_wait_statistics();
120
let mut results = crate::execute::execute_graph(&mut graph)?;
121
122
if std::env::var("POLARS_TRACK_WAIT_STATS").as_deref() == Ok("1") {
123
let mut stats = crate::async_executor::get_task_wait_statistics();
124
stats.sort_by_key(|(_l, w)| Reverse(*w));
125
eprintln!("Time spent waiting for async tasks:");
126
for (loc, wait_time) in stats {
127
eprintln!("{}:{} - {:?}", loc.file(), loc.line(), wait_time);
128
}
129
}
130
131
match top_ir {
132
IR::SinkMultiple { inputs } => {
133
let phys_node = &phys_sm[root_phys_node];
134
let PhysNodeKind::SinkMultiple { sinks } = phys_node.kind() else {
135
unreachable!();
136
};
137
138
Ok(QueryResult::Multiple(
139
sinks
140
.iter()
141
.map(|phys_node_key| {
142
results
143
.remove(phys_to_graph[*phys_node_key])
144
.unwrap_or_else(DataFrame::empty)
145
})
146
.collect(),
147
))
148
},
149
_ => Ok(QueryResult::Single(
150
results
151
.remove(phys_to_graph[root_phys_node])
152
.unwrap_or_else(DataFrame::empty),
153
)),
154
}
155
}
156
}
157
158
pub enum QueryResult {
159
Single(DataFrame),
160
/// Collected to multiple in-memory sinks
161
Multiple(Vec<DataFrame>),
162
}
163
164
impl QueryResult {
165
pub fn unwrap_single(self) -> DataFrame {
166
use QueryResult::*;
167
match self {
168
Single(df) => df,
169
Multiple(_) => panic!(),
170
}
171
}
172
173
pub fn unwrap_multiple(self) -> Vec<DataFrame> {
174
use QueryResult::*;
175
match self {
176
Single(_) => panic!(),
177
Multiple(dfs) => dfs,
178
}
179
}
180
}
181
182