Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/client/check.rs
8424 views
1
use polars_core::error::{PolarsResult, polars_err};
2
3
use crate::constants::POLARS_PLACEHOLDER;
4
use crate::dsl::{DslPlan, FileScanDsl, ScanSources, SinkType};
5
6
/// Assert that the given [`DslPlan`] is eligible to be executed on Polars Cloud.
7
pub(super) fn assert_cloud_eligible(dsl: &DslPlan, allow_local_scans: bool) -> PolarsResult<()> {
8
if std::env::var("POLARS_SKIP_CLIENT_CHECK").as_deref() == Ok("1") {
9
return Ok(());
10
}
11
12
// Check that the plan ends with a sink.
13
if !matches!(dsl, DslPlan::Sink { .. } | DslPlan::SinkMultiple { .. }) {
14
return ineligible_error("does not contain a sink");
15
}
16
17
for plan_node in dsl.into_iter() {
18
match plan_node {
19
#[cfg(feature = "python")]
20
DslPlan::PythonScan { .. } => (),
21
DslPlan::Scan {
22
sources, scan_type, ..
23
} => {
24
match sources {
25
ScanSources::Paths(paths) => {
26
if !allow_local_scans
27
&& paths
28
.iter()
29
.any(|p| !p.has_scheme() && p.as_str() != POLARS_PLACEHOLDER)
30
{
31
return ineligible_error("contains scan of local file system");
32
}
33
},
34
ScanSources::Files(_) => {
35
return ineligible_error("contains scan of opened files");
36
},
37
ScanSources::Buffers(_) => {
38
return ineligible_error("contains scan of in-memory buffer");
39
},
40
}
41
42
if matches!(&**scan_type, FileScanDsl::Anonymous { .. }) {
43
return ineligible_error("contains anonymous scan");
44
}
45
},
46
DslPlan::Sink { payload, .. } => {
47
match payload {
48
SinkType::Memory => {
49
return ineligible_error("contains memory sink");
50
},
51
SinkType::Callback(_) => {
52
return ineligible_error("contains callback sink");
53
},
54
SinkType::File { .. } | SinkType::Partitioned { .. } => {
55
// The sink destination is passed around separately, can't check the
56
// eligibility here.
57
},
58
}
59
},
60
_ => (),
61
}
62
}
63
Ok(())
64
}
65
66
fn ineligible_error(message: &str) -> PolarsResult<()> {
67
Err(polars_err!(
68
InvalidOperation:
69
"logical plan ineligible for execution on Polars Cloud: {message}"
70
))
71
}
72
73