Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/skeleton.rs
8422 views
1
#![allow(unused)] // TODO: remove me
2
use std::cmp::Reverse;
3
use std::time::{Duration, Instant};
4
5
use parking_lot::Mutex;
6
use polars_core::POOL;
7
use polars_core::prelude::*;
8
use polars_expr::planner::{ExpressionConversionState, create_physical_expr, get_expr_depth_limit};
9
use polars_plan::plans::{IR, IRPlan};
10
use polars_plan::prelude::AExpr;
11
use polars_plan::prelude::expr_ir::ExprIR;
12
use polars_utils::arena::{Arena, Node};
13
use polars_utils::relaxed_cell::RelaxedCell;
14
use slotmap::{SecondaryMap, SlotMap};
15
16
use crate::graph::{Graph, GraphNodeKey};
17
use crate::metrics::GraphMetrics;
18
use crate::physical_plan::{PhysNode, PhysNodeKey, PhysNodeKind, StreamingLowerIRContext};
19
20
/// Executes the IR with the streaming engine.
21
///
22
/// Unsupported operations can fall back to the in-memory engine.
23
///
24
/// Returns:
25
/// - `Ok(QueryResult::Single(DataFrame))` when collecting to a single sink.
26
/// - `Ok(QueryResult::Multiple(Vec<DataFrame>))` when collecting to multiple sinks.
27
/// - `Err` if the IR can't be executed.
28
///
29
/// Returned `DataFrame`s contain data only for memory sinks,
30
/// `DataFrame`s corresponding to file sinks are empty.
31
pub fn run_query(
32
node: Node,
33
ir_arena: &mut Arena<IR>,
34
expr_arena: &mut Arena<AExpr>,
35
) -> PolarsResult<QueryResult> {
36
StreamingQuery::build(node, ir_arena, expr_arena)?.execute()
37
}
38
39
/// Visualizes the physical plan as a dot graph.
40
pub fn visualize_physical_plan(
41
node: Node,
42
ir_arena: &mut Arena<IR>,
43
expr_arena: &mut Arena<AExpr>,
44
) -> PolarsResult<String> {
45
let mut phys_sm = SlotMap::with_capacity_and_key(ir_arena.len());
46
47
let ctx = StreamingLowerIRContext {
48
prepare_visualization: true,
49
};
50
let root_phys_node =
51
crate::physical_plan::build_physical_plan(node, ir_arena, expr_arena, &mut phys_sm, ctx)?;
52
53
let out = crate::physical_plan::visualize_plan(root_phys_node, &phys_sm, expr_arena);
54
55
Ok(out)
56
}
57
58
pub struct StreamingQuery {
59
top_ir: IR,
60
pub graph: Graph,
61
pub root_phys_node: PhysNodeKey,
62
pub phys_sm: SlotMap<PhysNodeKey, PhysNode>,
63
pub phys_to_graph: SecondaryMap<PhysNodeKey, GraphNodeKey>,
64
pub metrics: Option<Arc<Mutex<GraphMetrics>>>,
65
}
66
67
/// Configures if IR lowering creates the `format_str` for `InMemoryMap`.
68
pub static PREPARE_VISUALIZATION_DATA: RelaxedCell<bool> = RelaxedCell::new_bool(false);
69
70
/// Sets config to ensure IR lowering always creates the `format_str` for `InMemoryMap`.
71
pub fn always_prepare_visualization_data() {
72
PREPARE_VISUALIZATION_DATA.store(true);
73
}
74
75
fn cfg_prepare_visualization_data() -> bool {
76
if !PREPARE_VISUALIZATION_DATA.load() {
77
PREPARE_VISUALIZATION_DATA.fetch_or(
78
std::env::var("POLARS_STREAM_ALWAYS_PREPARE_VISUALIZATION_DATA").as_deref() == Ok("1"),
79
);
80
}
81
82
PREPARE_VISUALIZATION_DATA.load()
83
}
84
85
impl StreamingQuery {
86
pub fn build(
87
node: Node,
88
ir_arena: &mut Arena<IR>,
89
expr_arena: &mut Arena<AExpr>,
90
) -> PolarsResult<Self> {
91
if let Ok(visual_path) = std::env::var("POLARS_VISUALIZE_IR") {
92
let plan = IRPlan {
93
lp_top: node,
94
lp_arena: ir_arena.clone(),
95
expr_arena: expr_arena.clone(),
96
};
97
let visualization = plan.display_dot().to_string();
98
std::fs::write(visual_path, visualization).unwrap();
99
}
100
let mut phys_sm = SlotMap::with_capacity_and_key(ir_arena.len());
101
let ctx = StreamingLowerIRContext {
102
prepare_visualization: cfg_prepare_visualization_data(),
103
};
104
let root_phys_node = crate::physical_plan::build_physical_plan(
105
node,
106
ir_arena,
107
expr_arena,
108
&mut phys_sm,
109
ctx,
110
)?;
111
if let Ok(visual_path) = std::env::var("POLARS_VISUALIZE_PHYSICAL_PLAN") {
112
let visualization =
113
crate::physical_plan::visualize_plan(root_phys_node, &phys_sm, expr_arena);
114
std::fs::write(visual_path, visualization).unwrap();
115
}
116
117
let (mut graph, phys_to_graph) =
118
crate::physical_plan::physical_plan_to_graph(root_phys_node, &phys_sm, expr_arena)?;
119
120
let top_ir = ir_arena.get(node).clone();
121
122
let metrics = if std::env::var("POLARS_TRACK_METRICS").as_deref() == Ok("1")
123
|| std::env::var("POLARS_LOG_METRICS").as_deref() == Ok("1")
124
{
125
crate::async_executor::track_task_metrics(true);
126
Some(Arc::default())
127
} else {
128
None
129
};
130
131
let out = StreamingQuery {
132
top_ir,
133
graph,
134
root_phys_node,
135
phys_sm,
136
phys_to_graph,
137
metrics,
138
};
139
140
Ok(out)
141
}
142
143
pub fn execute(self) -> PolarsResult<QueryResult> {
144
let StreamingQuery {
145
top_ir,
146
mut graph,
147
root_phys_node,
148
phys_sm,
149
phys_to_graph,
150
metrics,
151
} = self;
152
153
let query_start = Instant::now();
154
let mut results = crate::execute::execute_graph(&mut graph, metrics.clone())?;
155
let query_elapsed = query_start.elapsed();
156
157
// Print metrics.
158
if let Some(lock) = metrics
159
&& std::env::var("POLARS_LOG_METRICS").as_deref() == Ok("1")
160
{
161
let mut total_query_ns = 0;
162
let mut lines = Vec::new();
163
let m = lock.lock();
164
for phys_node_key in phys_sm.keys() {
165
let Some(graph_node_key) = phys_to_graph.get(phys_node_key) else {
166
continue;
167
};
168
let Some(node_metrics) = m.get(*graph_node_key) else {
169
continue;
170
};
171
let name = graph.nodes[*graph_node_key].compute.name();
172
let total_ns =
173
node_metrics.total_poll_time_ns + node_metrics.total_state_update_time_ns;
174
let total_time = Duration::from_nanos(total_ns);
175
let poll_time = Duration::from_nanos(node_metrics.total_poll_time_ns);
176
let update_time = Duration::from_nanos(node_metrics.total_state_update_time_ns);
177
let max_poll_time = Duration::from_nanos(node_metrics.max_poll_time_ns);
178
let max_update_time = Duration::from_nanos(node_metrics.max_state_update_time_ns);
179
let total_polls = node_metrics.total_polls;
180
let total_updates = node_metrics.total_state_updates;
181
let perc_stolen = node_metrics.total_stolen_polls as f64
182
/ node_metrics.total_polls as f64
183
* 100.0;
184
185
let rows_received = node_metrics.rows_received;
186
let morsels_received = node_metrics.morsels_received;
187
let max_received = node_metrics.largest_morsel_received;
188
let rows_sent = node_metrics.rows_sent;
189
let morsels_sent = node_metrics.morsels_sent;
190
let max_sent = node_metrics.largest_morsel_sent;
191
192
let io_total_active_time = Duration::from_nanos(node_metrics.io_total_active_ns);
193
let io_total_bytes_requested = node_metrics.io_total_bytes_requested;
194
let io_total_bytes_received = node_metrics.io_total_bytes_received;
195
let io_total_bytes_sent = node_metrics.io_total_bytes_sent;
196
197
lines.push(
198
(total_time, format!(
199
"{name}: tot({total_time:.2?}), \
200
poll({poll_time:.2?}, n={total_polls}, max={max_poll_time:.2?}, stolen={perc_stolen:.1}%), \
201
update({update_time:.2?}, n={total_updates}, max={max_update_time:.2?}), \
202
recv(row={rows_received}, morsel={morsels_received}, max={max_received}), \
203
sent(row={rows_sent}, morsel={morsels_sent}, max={max_sent}), \
204
io(\
205
total_active_time={io_total_active_time:.2?}, \
206
total_bytes_requested={io_total_bytes_requested}, \
207
total_bytes_received={io_total_bytes_received}, \
208
total_bytes_sent={io_total_bytes_sent})"))
209
);
210
211
total_query_ns += total_ns;
212
}
213
lines.sort_by_key(|(tot, _)| Reverse(*tot));
214
215
let total_query_time = Duration::from_nanos(total_query_ns);
216
eprintln!(
217
"Streaming query took {query_elapsed:.2?} ({total_query_time:.2?} CPU), detailed breakdown:"
218
);
219
for (_tot, line) in lines {
220
eprintln!("{line}");
221
}
222
eprintln!();
223
}
224
225
match top_ir {
226
IR::SinkMultiple { inputs } => {
227
let phys_node = &phys_sm[root_phys_node];
228
let PhysNodeKind::SinkMultiple { sinks } = phys_node.kind() else {
229
unreachable!();
230
};
231
232
Ok(QueryResult::Multiple(
233
sinks
234
.iter()
235
.map(|phys_node_key| {
236
results
237
.remove(phys_to_graph[*phys_node_key])
238
.unwrap_or_else(DataFrame::empty)
239
})
240
.collect(),
241
))
242
},
243
_ => Ok(QueryResult::Single(
244
results
245
.remove(phys_to_graph[root_phys_node])
246
.unwrap_or_else(DataFrame::empty),
247
)),
248
}
249
}
250
}
251
252
pub enum QueryResult {
253
Single(DataFrame),
254
/// Collected to multiple in-memory sinks
255
Multiple(Vec<DataFrame>),
256
}
257
258
impl QueryResult {
259
pub fn unwrap_single(self) -> DataFrame {
260
use QueryResult::*;
261
match self {
262
Single(df) => df,
263
Multiple(_) => panic!(),
264
}
265
}
266
267
pub fn unwrap_multiple(self) -> Vec<DataFrame> {
268
use QueryResult::*;
269
match self {
270
Single(_) => panic!(),
271
Multiple(dfs) => dfs,
272
}
273
}
274
}
275
276