Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/client/check.rs
6939 views
1
use polars_core::error::{PolarsResult, polars_err};
2
3
use crate::constants::POLARS_PLACEHOLDER;
4
use crate::dsl::{DslPlan, FileScanDsl, ScanSources, SinkType};
5
6
/// Assert that the given [`DslPlan`] is eligible to be executed on Polars Cloud.
7
pub(super) fn assert_cloud_eligible(dsl: &DslPlan) -> PolarsResult<()> {
8
if std::env::var("POLARS_SKIP_CLIENT_CHECK").as_deref() == Ok("1") {
9
return Ok(());
10
}
11
12
// Check that the plan ends with a sink.
13
if !matches!(dsl, DslPlan::Sink { .. }) {
14
return ineligible_error("does not contain a sink");
15
}
16
17
for plan_node in dsl.into_iter() {
18
match plan_node {
19
#[cfg(feature = "python")]
20
DslPlan::PythonScan { .. } => (),
21
DslPlan::Scan {
22
sources, scan_type, ..
23
} => {
24
match sources {
25
ScanSources::Paths(addrs) => {
26
if addrs
27
.iter()
28
.any(|p| !p.is_cloud_url() && p.to_str() != POLARS_PLACEHOLDER)
29
{
30
return ineligible_error("contains scan of local file system");
31
}
32
},
33
ScanSources::Files(_) => {
34
return ineligible_error("contains scan of opened files");
35
},
36
ScanSources::Buffers(_) => {
37
return ineligible_error("contains scan of in-memory buffer");
38
},
39
}
40
41
if matches!(&**scan_type, FileScanDsl::Anonymous { .. }) {
42
return ineligible_error("contains anonymous scan");
43
}
44
},
45
DslPlan::Sink { payload, .. } => {
46
match payload {
47
SinkType::Memory => {
48
return ineligible_error("contains memory sink");
49
},
50
SinkType::File(_) => {
51
// The sink destination is passed around separately, can't check the
52
// eligibility here.
53
},
54
SinkType::Partition(_) => {
55
return ineligible_error("contains partition sink");
56
},
57
}
58
},
59
DslPlan::SinkMultiple { .. } => {
60
return ineligible_error("contains sink multiple");
61
},
62
_ => (),
63
}
64
}
65
Ok(())
66
}
67
68
fn ineligible_error(message: &str) -> PolarsResult<()> {
69
Err(polars_err!(
70
InvalidOperation:
71
"logical plan ineligible for execution on Polars Cloud: {message}"
72
))
73
}
74
75
impl DslPlan {
76
fn inputs<'a>(&'a self, scratch: &mut Vec<&'a DslPlan>) {
77
use DslPlan::*;
78
match self {
79
Select { input, .. }
80
| GroupBy { input, .. }
81
| Filter { input, .. }
82
| Distinct { input, .. }
83
| Sort { input, .. }
84
| Slice { input, .. }
85
| HStack { input, .. }
86
| MatchToSchema { input, .. }
87
| PipeWithSchema { input, .. }
88
| MapFunction { input, .. }
89
| Sink { input, .. }
90
| Cache { input, .. } => scratch.push(input),
91
Union { inputs, .. } | HConcat { inputs, .. } | SinkMultiple { inputs } => {
92
scratch.extend(inputs)
93
},
94
Join {
95
input_left,
96
input_right,
97
..
98
} => {
99
scratch.push(input_left);
100
scratch.push(input_right);
101
},
102
ExtContext { input, contexts } => {
103
scratch.push(input);
104
scratch.extend(contexts);
105
},
106
IR { dsl, .. } => scratch.push(dsl),
107
Scan { .. } | DataFrameScan { .. } => (),
108
#[cfg(feature = "python")]
109
PythonScan { .. } => (),
110
#[cfg(feature = "merge_sorted")]
111
MergeSorted {
112
input_left,
113
input_right,
114
..
115
} => {
116
scratch.push(input_left);
117
scratch.push(input_right);
118
},
119
}
120
}
121
}
122
123
pub struct DslPlanIter<'a> {
124
stack: Vec<&'a DslPlan>,
125
}
126
127
impl<'a> Iterator for DslPlanIter<'a> {
128
type Item = &'a DslPlan;
129
130
fn next(&mut self) -> Option<Self::Item> {
131
self.stack
132
.pop()
133
.inspect(|next| next.inputs(&mut self.stack))
134
}
135
}
136
137
impl<'a> IntoIterator for &'a DslPlan {
138
type Item = &'a DslPlan;
139
type IntoIter = DslPlanIter<'a>;
140
141
fn into_iter(self) -> Self::IntoIter {
142
DslPlanIter { stack: vec![self] }
143
}
144
}
145
146