Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/physical_plan/visualization/models.rs
7884 views
1
use std::num::NonZeroUsize;
2
3
use polars_io::utils::sync_on_close::SyncOnCloseType;
4
use polars_ops::frame::MaintainOrderJoin;
5
use polars_ops::prelude::{JoinCoalesce, JoinValidation};
6
use polars_plan::dsl::PredicateFileSkip;
7
use polars_plan::dsl::sink2::FileProviderType;
8
use polars_utils::IdxSize;
9
use polars_utils::pl_str::PlSmallStr;
10
11
use crate::physical_plan::ZipBehavior;
12
13
#[derive(serde::Serialize, serde::Deserialize, Debug)]
14
#[cfg_attr(
15
feature = "physical_plan_visualization_schema",
16
derive(schemars::JsonSchema)
17
)]
18
pub struct PhysicalPlanVisualizationData {
19
pub title: PlSmallStr,
20
/// Number of nodes from the start of `nodes` that are root nodes.
21
pub num_roots: u64,
22
pub nodes: Vec<PhysNodeInfo>,
23
pub edges: Vec<Edge>,
24
}
25
26
impl PhysicalPlanVisualizationData {
27
pub fn to_json(&self) -> polars_error::PolarsResult<String> {
28
serde_json::to_string(self).map_err(polars_error::to_compute_err)
29
}
30
}
31
32
#[derive(serde::Serialize, serde::Deserialize, Debug, Default)]
33
#[cfg_attr(
34
feature = "physical_plan_visualization_schema",
35
derive(schemars::JsonSchema)
36
)]
37
pub struct PhysNodeInfo {
38
pub id: u64,
39
pub title: PlSmallStr,
40
pub properties: PhysNodeProperties,
41
}
42
43
#[derive(serde::Serialize, serde::Deserialize, Debug, Default)]
44
#[cfg_attr(
45
feature = "physical_plan_visualization_schema",
46
derive(schemars::JsonSchema)
47
)]
48
pub struct Edge {
49
pub source: u64,
50
pub target: u64,
51
}
52
53
impl Edge {
54
pub fn new<T, U>(source: T, target: U) -> Self
55
where
56
u64: TryFrom<T> + TryFrom<U>,
57
<u64 as TryFrom<T>>::Error: std::fmt::Debug,
58
<u64 as TryFrom<U>>::Error: std::fmt::Debug,
59
{
60
Self {
61
source: source.try_into().unwrap(),
62
target: target.try_into().unwrap(),
63
}
64
}
65
}
66
67
#[derive(serde::Serialize, serde::Deserialize)]
68
#[serde(tag = "type")]
69
#[derive(Default, Debug, strum_macros::IntoStaticStr)]
70
#[cfg_attr(
71
feature = "physical_plan_visualization_schema",
72
derive(schemars::JsonSchema)
73
)]
74
pub enum PhysNodeProperties {
75
#[default]
76
Default,
77
CallbackSink {
78
callback_function: PlSmallStr,
79
maintain_order: bool,
80
chunk_size: Option<NonZeroUsize>,
81
},
82
DynamicSlice,
83
FileSink {
84
target: PlSmallStr,
85
file_format: PlSmallStr,
86
sync_on_close: SyncOnCloseType,
87
maintain_order: bool,
88
mkdir: bool,
89
cloud_options: bool,
90
},
91
Filter {
92
predicate: PlSmallStr,
93
},
94
GatherEvery {
95
n: u64,
96
offset: u64,
97
},
98
GroupBy {
99
keys: Vec<PlSmallStr>,
100
aggs: Vec<PlSmallStr>,
101
},
102
#[cfg(feature = "dynamic_group_by")]
103
DynamicGroupBy {
104
index_column: PlSmallStr,
105
period: PlSmallStr,
106
every: PlSmallStr,
107
offset: PlSmallStr,
108
start_by: PlSmallStr,
109
label: PlSmallStr,
110
include_boundaries: bool,
111
closed_window: PlSmallStr,
112
aggs: Vec<PlSmallStr>,
113
slice: Option<(u64, u64)>,
114
},
115
#[cfg(feature = "dynamic_group_by")]
116
RollingGroupBy {
117
index_column: PlSmallStr,
118
period: PlSmallStr,
119
offset: PlSmallStr,
120
closed_window: PlSmallStr,
121
slice: Option<(u64, u64)>,
122
aggs: Vec<PlSmallStr>,
123
},
124
SortedGroupBy {
125
key: PlSmallStr,
126
aggs: Vec<PlSmallStr>,
127
slice: Option<(IdxSize, IdxSize)>,
128
},
129
InMemoryMap {
130
format_str: PlSmallStr,
131
},
132
InMemorySink,
133
InMemorySource {
134
n_rows: u64,
135
schema_names: Vec<PlSmallStr>,
136
},
137
InputIndependentSelect {
138
selectors: Vec<PlSmallStr>,
139
},
140
// Joins
141
CrossJoin {
142
maintain_order: MaintainOrderJoin,
143
suffix: Option<PlSmallStr>,
144
},
145
EquiJoin {
146
how: PlSmallStr,
147
left_on: Vec<PlSmallStr>,
148
right_on: Vec<PlSmallStr>,
149
nulls_equal: bool,
150
coalesce: JoinCoalesce,
151
maintain_order: MaintainOrderJoin,
152
validation: JoinValidation,
153
suffix: Option<PlSmallStr>,
154
},
155
InMemoryJoin {
156
how: PlSmallStr,
157
left_on: Vec<PlSmallStr>,
158
right_on: Vec<PlSmallStr>,
159
nulls_equal: bool,
160
coalesce: JoinCoalesce,
161
maintain_order: MaintainOrderJoin,
162
validation: JoinValidation,
163
suffix: Option<PlSmallStr>,
164
slice: Option<(i64, u64)>,
165
},
166
InMemoryAsOfJoin {
167
left_on: PlSmallStr,
168
right_on: PlSmallStr,
169
left_by: Option<Vec<PlSmallStr>>,
170
right_by: Option<Vec<PlSmallStr>>,
171
strategy: polars_ops::frame::AsofStrategy,
172
/// [value, dtype_str]
173
tolerance: Option<[PlSmallStr; 2]>,
174
suffix: Option<PlSmallStr>,
175
slice: Option<(i64, u64)>,
176
coalesce: JoinCoalesce,
177
allow_eq: bool,
178
check_sortedness: bool,
179
},
180
InMemoryIEJoin {
181
left_on: Vec<PlSmallStr>,
182
right_on: Vec<PlSmallStr>,
183
inequality_operators: Vec<polars_ops::frame::InequalityOperator>,
184
suffix: Option<PlSmallStr>,
185
slice: Option<(i64, u64)>,
186
},
187
Map {
188
display_str: PlSmallStr,
189
format_str: PlSmallStr,
190
},
191
MultiScan {
192
scan_type: PlSmallStr,
193
num_sources: u64,
194
first_source: Option<PlSmallStr>,
195
projected_file_columns: Vec<PlSmallStr>,
196
file_projection_builder_type: PlSmallStr,
197
row_index_name: Option<PlSmallStr>,
198
row_index_offset: Option<u64>,
199
pre_slice: Option<(i64, u64)>,
200
predicate: Option<PlSmallStr>,
201
predicate_file_skip_applied: Option<PredicateFileSkip>,
202
has_table_statistics: bool,
203
include_file_paths: Option<PlSmallStr>,
204
deletion_files_type: Option<PlSmallStr>,
205
hive_columns: Option<Vec<PlSmallStr>>,
206
},
207
Multiplexer,
208
NegativeSlice {
209
offset: i64,
210
length: u64,
211
},
212
OrderedUnion {
213
num_inputs: u64,
214
},
215
PartitionSink {
216
base_path: PlSmallStr,
217
file_path_callback: Option<PlSmallStr>,
218
partition_variant: PlSmallStr,
219
partition_variant_max_size: Option<u64>,
220
partition_variant_key_exprs: Option<Vec<PlSmallStr>>,
221
partition_variant_include_key: Option<bool>,
222
file_type: PlSmallStr,
223
per_partition_sort_exprs: Option<Vec<PlSmallStr>>,
224
per_partition_sort_descending: Option<Vec<bool>>,
225
per_partition_sort_nulls_last: Option<Vec<bool>>,
226
finish_callback: Option<PlSmallStr>,
227
sync_on_close: SyncOnCloseType,
228
maintain_order: bool,
229
mkdir: bool,
230
},
231
PartitionSink2 {
232
base_path: PlSmallStr,
233
file_path_provider: FileProviderType,
234
file_format: PlSmallStr,
235
partition_strategy: PlSmallStr,
236
partition_key_exprs: Option<Vec<PlSmallStr>>,
237
include_keys: Option<bool>,
238
per_partition_sort_exprs: Option<Vec<PlSmallStr>>,
239
per_partition_sort_descending: Option<Vec<bool>>,
240
per_partition_sort_nulls_last: Option<Vec<bool>>,
241
mkdir: bool,
242
maintain_order: bool,
243
sync_on_close: SyncOnCloseType,
244
cloud_options: bool,
245
max_rows_per_file: u64,
246
approximate_bytes_per_file: u64,
247
},
248
PeakMin,
249
PeakMax,
250
Reduce {
251
exprs: Vec<PlSmallStr>,
252
},
253
Repeat,
254
Rle,
255
RleId,
256
Select {
257
selectors: Vec<PlSmallStr>,
258
extend_original: bool,
259
},
260
Shift {
261
has_fill: bool,
262
},
263
SimpleProjection {
264
columns: Vec<PlSmallStr>,
265
},
266
SinkMultiple {
267
num_sinks: u64,
268
},
269
Sort {
270
by_exprs: Vec<PlSmallStr>,
271
slice: Option<(i64, u64)>,
272
descending: Vec<bool>,
273
nulls_last: Vec<bool>,
274
multithreaded: bool,
275
maintain_order: bool,
276
limit: Option<u64>,
277
},
278
Slice {
279
offset: i64,
280
length: u64,
281
},
282
TopK {
283
by_exprs: Vec<PlSmallStr>,
284
reverse: Vec<bool>,
285
nulls_last: Vec<bool>,
286
},
287
WithRowIndex {
288
name: PlSmallStr,
289
offset: Option<u64>,
290
},
291
Zip {
292
num_inputs: u64,
293
zip_behavior: ZipBehavior,
294
},
295
//
296
// Feature gated
297
//
298
#[cfg(feature = "cum_agg")]
299
CumAgg {
300
kind: PlSmallStr,
301
},
302
#[cfg(feature = "ewma")]
303
Ewm {
304
variant: PlSmallStr,
305
alpha: f64,
306
adjust: bool,
307
bias: bool,
308
min_periods: usize,
309
ignore_nulls: bool,
310
},
311
#[cfg(feature = "semi_anti_join")]
312
SemiAntiJoin {
313
left_on: Vec<PlSmallStr>,
314
right_on: Vec<PlSmallStr>,
315
nulls_equal: bool,
316
output_as_bool: bool,
317
},
318
#[cfg(feature = "merge_sorted")]
319
MergeSorted,
320
#[cfg(feature = "python")]
321
PythonScan {
322
scan_source_type: polars_plan::prelude::python_dsl::PythonScanSource,
323
n_rows: Option<u64>,
324
projection: Option<Vec<PlSmallStr>>,
325
predicate: Option<PlSmallStr>,
326
schema_names: Vec<PlSmallStr>,
327
is_pure: bool,
328
validate_schema: bool,
329
},
330
}
331
332